package com.example.testpulsar.mq.runner;

import com.alibaba.fastjson.JSONObject;
import com.example.testpulsar.mq.factory.ConsumerFactory;
import com.example.testpulsar.mq.messagepojo.Msg;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class ConsumerThread implements Runnable {

    @Resource
    ConsumerFactory consumerFactory;

    @Override
    public void run() {
        MessageListener<String> listener = (consumer, msg) -> {
            String value = msg.getValue();
            Msg msgObj = JSONObject.parseObject(value, Msg.class);
            System.out.println(msgObj);
        };
        try {
            consumerFactory.createWXGZHHistoryListAcqConsumer(listener);
        } catch (PulsarClientException e) {
            e.printStackTrace();
            System.out.println("创建消费者异常");
        }
    }
}
